{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "wJcYs_ERTnnI"
      },
      "source": [
        "##### Copyright 2021 The TensorFlow Authors."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "HMUDt0CiUJk9"
      },
      "outputs": [],
      "source": [
        "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n",
        "# you may not use this file except in compliance with the License.\n",
        "# You may obtain a copy of the License at\n",
        "#\n",
        "# https://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing, software\n",
        "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
        "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
        "# See the License for the specific language governing permissions and\n",
        "# limitations under the License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "77z2OchJTk0l"
      },
      "source": [
        "# Migrate multi-worker CPU/GPU training\n",
        "\n",
        "<table class=\"tfo-notebook-buttons\" align=\"left\">\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://www.tensorflow.org/guide/migrate/multi_worker_cpu_gpu_training\">\n",
        "    <img src=\"https://www.tensorflow.org/images/tf_logo_32px.png\" />\n",
        "    View on TensorFlow.org</a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/guide/migrate/multi_worker_cpu_gpu_training.ipynb\">\n",
        "    <img src=\"https://www.tensorflow.org/images/colab_logo_32px.png\" />\n",
        "    Run in Google Colab</a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://github.com/tensorflow/docs/blob/master/site/en/guide/migrate/multi_worker_cpu_gpu_training.ipynb\">\n",
        "    <img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" />\n",
        "    View source on GitHub</a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a href=\"https://storage.googleapis.com/tensorflow_docs/docs/site/en/guide/migrate/multi_worker_cpu_gpu_training.ipynb\"><img src=\"https://www.tensorflow.org/images/download_logo_32px.png\" />Download notebook</a>\n",
        "  </td>\n",
        "</table>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "meUTrR4I6m1C"
      },
      "source": [
        "This guide demonstrates how to migrate your multi-worker distributed training workflow from TensorFlow 1 to TensorFlow 2.\n",
        "\n",
        "To perform multi-worker training with CPUs/GPUs:\n",
        "\n",
        "- In TensorFlow 1, you traditionally use the `tf.estimator.train_and_evaluate` and `tf.estimator.Estimator` APIs.\n",
        "- In TensorFlow 2, use the Keras APIs for writing the model, the loss function, the optimizer, and metrics. Then, distribute the training with Keras `Model.fit` API or a custom training loop (with `tf.GradientTape`) across multiple workers with `tf.distribute.experimental.ParameterServerStrategy` or `tf.distribute.MultiWorkerMirroredStrategy`. For more details, refer to the following tutorials:\n",
        "  - [Distributed training with TensorFlow](../../guide/distributed_training.ipynb)\n",
        "  - [Parameter server training with Keras Model.fit/a custom training loop](../../tutorials/distribute/parameter_server_training.ipynb)\n",
        "  - [MultiWorkerMirroredStrategy with Keras Model.fit](../../tutorials/distribute/multi_worker_with_keras.ipynb)\n",
        "  - [MultiWorkerMirroredStrategy with a custom training loop](../../tutorials/distribute/multi_worker_with_ctl.ipynb)."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "YdZSoIXEbhg-"
      },
      "source": [
        "## Setup"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "28f46832b54d"
      },
      "source": [
        "Start with some necessary imports and a simple dataset for demonstration purposes:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "iE0vSfMXumKI"
      },
      "outputs": [],
      "source": [
        "# The notebook uses a dataset instance for `Model.fit` with\n",
        "# `ParameterServerStrategy`, which depends on symbols in TF 2.7.\n",
        "# Install a utility needed for this demonstration\n",
        "!pip install portpicker\n",
        "\n",
        "import tensorflow as tf\n",
        "import tensorflow.compat.v1 as tf1"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "m7rnGxsXtDkV"
      },
      "outputs": [],
      "source": [
        "features = [[1., 1.5], [2., 2.5], [3., 3.5]]\n",
        "labels = [[0.3], [0.5], [0.7]]\n",
        "eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]\n",
        "eval_labels = [[0.8], [0.9], [1.]]"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "T2uaw9QaDM_X"
      },
      "source": [
        "You will need the `'TF_CONFIG'` configuration environment variable for training on multiple machines in TensorFlow. Use `'TF_CONFIG'` to specify the `'cluster'` and the `'task'`s' addresses. (Learn more in the [Distributed_training](../...guide/distributed_training.ipynb) guide.)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "4OUzwoQgXgkG"
      },
      "outputs": [],
      "source": [
        "import json\n",
        "import os\n",
        "\n",
        "tf_config = {\n",
        "    'cluster': {\n",
        "        'chief': ['localhost:11111'],\n",
        "        'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],\n",
        "        'ps': ['localhost:12121', 'localhost:13131'],\n",
        "    },\n",
        "    'task': {'type': 'chief', 'index': 0}\n",
        "}\n",
        "\n",
        "os.environ['TF_CONFIG'] = json.dumps(tf_config)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "PbeoSbbmDdc0"
      },
      "source": [
        "Note: Unfortunately, since multi-worker training with `tf.estimator` APIs in TensorFlow 1 requires multiple clients (which would be especially tricky to be done here in this Colab notebook), you will make the notebook runnable without a `'TF_CONFIG'` environment variable, so it falls back to local training. (Learn more in the *Setting up the `'TF_CONFIG'` environment variable* section in the [Distributed training with TensorFlow](../../guide/distributed_training.ipynb) guide.)\n",
        "\n",
        "Use the `del` statement to remove the variable (but in real-world multi-worker training in TensorFlow 1, you won't have to do this):"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "AHuynAR5D8sU"
      },
      "outputs": [],
      "source": [
        "del os.environ['TF_CONFIG']"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "4uXff1BEssdE"
      },
      "source": [
        "## TensorFlow 1: Multi-worker distributed training with tf.estimator APIs"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MpyINdiLEN3c"
      },
      "source": [
        "The following code snippet demonstrates the canonical workflow of multi-worker training in TF1: you will use a `tf.estimator.Estimator`, a `tf.estimator.TrainSpec`, a `tf.estimator.EvalSpec`, and the `tf.estimator.train_and_evaluate` API to distribute the training:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "lqe9obf7suIj"
      },
      "outputs": [],
      "source": [
        "def _input_fn():\n",
        "  return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)\n",
        "\n",
        "def _eval_input_fn():\n",
        "  return tf1.data.Dataset.from_tensor_slices(\n",
        "      (eval_features, eval_labels)).batch(1)\n",
        "\n",
        "def _model_fn(features, labels, mode):\n",
        "  logits = tf1.layers.Dense(1)(features)\n",
        "  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)\n",
        "  optimizer = tf1.train.AdagradOptimizer(0.05)\n",
        "  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())\n",
        "  return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)\n",
        "\n",
        "estimator = tf1.estimator.Estimator(model_fn=_model_fn)\n",
        "train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)\n",
        "eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)\n",
        "tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "KEmzBjfnsxwT"
      },
      "source": [
        "## TensorFlow 2: Multi-worker training with distribution strategies"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Syb66qsbEp1x"
      },
      "source": [
        "In TensorFlow 2, distributed training across multiple workers with CPUs, GPUs, and TPUs is done via `tf.distribute.Strategy`s.\n",
        "\n",
        "The following example demonstrates how to use two such strategies: `tf.distribute.experimental.ParameterServerStrategy` and `tf.distribute.MultiWorkerMirroredStrategy`, both of which are designed for CPU/GPU training with multiple workers.\n",
        "\n",
        "`ParameterServerStrategy` employs a _coordinator_ (`'chief'`), which makes it more friendly with the environment in this Colab notebook. You will be using some utilities here to set up the supporting elements essential for a runnable experience here: you will create an _in-process cluster_, where threads are used to simulate the parameter servers (`'ps'`) and workers (`'worker'`). For more information about parameter server training, refer to the [Parameter server training with ParameterServerStrategy](../../tutorials/distribute/parameter_server_training.ipynb) tutorial.\n",
        "\n",
        "In this example, first define the `'TF_CONFIG'` environment variable with a `tf.distribute.cluster_resolver.TFConfigClusterResolver` to provide the cluster information. If you are using a cluster management system for your distributed training, check if it provides `'TF_CONFIG'` for you already, in which case you don't need to explicitly set this environment variable. (Learn more in the *Setting up the `'TF_CONFIG'` environment variable* section in the [Distributed training with TensorFlow](../../guide/distributed_training.ipynb) guide.)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "rp-gFY0H5rF-"
      },
      "outputs": [],
      "source": [
        "# Find ports that are available for the `'chief'` (the coordinator),\n",
        "# `'worker'`s, and `'ps'` (parameter servers).\n",
        "import portpicker\n",
        "\n",
        "chief_port = portpicker.pick_unused_port()\n",
        "worker_ports = [portpicker.pick_unused_port() for _ in range(3)]\n",
        "ps_ports = [portpicker.pick_unused_port() for _ in range(2)]\n",
        "\n",
        "# Dump the cluster information to `'TF_CONFIG'`.\n",
        "tf_config = {\n",
        "    'cluster': {\n",
        "        'chief': [\"localhost:%s\" % chief_port],\n",
        "        'worker': [\"localhost:%s\" % port for port in worker_ports],\n",
        "        'ps':  [\"localhost:%s\" % port for port in ps_ports],\n",
        "    },\n",
        "    'task': {'type': 'chief', 'index': 0}\n",
        "}\n",
        "os.environ['TF_CONFIG'] = json.dumps(tf_config)\n",
        "\n",
        "# Use a cluster resolver to bridge the information to the strategy created below.\n",
        "cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "o_8uVvJb6dqq"
      },
      "source": [
        "Then, create `tf.distribute.Server`s for the workers and parameter servers one-by-one:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "ZJopinmG6b2z"
      },
      "outputs": [],
      "source": [
        "# Workers need some inter_ops threads to work properly.\n",
        "# This is only needed for this notebook to demo. Real servers\n",
        "# should not need this.\n",
        "worker_config = tf.compat.v1.ConfigProto()\n",
        "worker_config.inter_op_parallelism_threads = 4\n",
        "\n",
        "for i in range(3):\n",
        "  tf.distribute.Server(\n",
        "      cluster_resolver.cluster_spec(),\n",
        "      job_name=\"worker\",\n",
        "      task_index=i,\n",
        "      config=worker_config)\n",
        "\n",
        "for i in range(2):\n",
        "  tf.distribute.Server(\n",
        "      cluster_resolver.cluster_spec(),\n",
        "      job_name=\"ps\",\n",
        "      task_index=i)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IpfCcF0g6Ao8"
      },
      "source": [
        "In real-world distributed training, instead of starting all the `tf.distribute.Server`s on the coordinator, you will be using multiple machines, and the ones that are designated as `\"worker\"`s and `\"ps\"` (parameter servers) will each run a `tf.distribute.Server`. Refer to *Clusters in the real world* section in the [Parameter server training](../../tutorials/distribute/parameter_server_training.ipynb) tutorial for more details.\n",
        "\n",
        "With everything ready, create the `ParameterServerStrategy` object:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "t45iQeBT7Us_"
      },
      "outputs": [],
      "source": [
        "strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "diNsps1MGRS6"
      },
      "source": [
        "Once you have created a strategy object, define the model,  the optimizer, and other variables, and call the Keras `Model.compile` within the `Strategy.scope` API to distribute the training. (Refer to the `Strategy.scope` API docs for more information.)\n",
        "\n",
        "If you prefer to customize your training by, for instance, defining the forward and backward passes, refer to *Training with a custom training loop* section in [Parameter server training](../../tutorials/distribute/parameter_server_training.ipynb) tutorial for more details."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "atVciNgPs0fw"
      },
      "outputs": [],
      "source": [
        "dataset = tf.data.Dataset.from_tensor_slices(\n",
        "      (features, labels)).shuffle(10).repeat().batch(64)\n",
        "\n",
        "eval_dataset = tf.data.Dataset.from_tensor_slices(\n",
        "      (eval_features, eval_labels)).repeat().batch(1)\n",
        "\n",
        "with strategy.scope():\n",
        "  model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])\n",
        "  optimizer = tf.keras.optimizers.legacy.Adagrad(learning_rate=0.05)\n",
        "  model.compile(optimizer, \"mse\")\n",
        "\n",
        "model.fit(dataset, epochs=5, steps_per_epoch=10)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "akZ0aaaS1vA9"
      },
      "outputs": [],
      "source": [
        "model.evaluate(eval_dataset, steps=10, return_dict=True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "pXbS71XmMSoO"
      },
      "source": [
        "> **Partitioners (`tf.distribute.experimental.partitioners`)**\n",
        ">\n",
        "> `ParameterServerStrategy` in TensorFlow 2 supports variable partitioning and offers same partitioners as TensorFlow 1, with less confusing names:\n",
        "> - `tf.compat.v1.variable_axis_size_partitioner` -> `tf.distribute.experimental.partitioners.MaxSizePartitioner`: a partitioner that keeps shards under a maximum size).\n",
        "> - `tf.compat.v1.min_max_variable_partitioner` -> `tf.distribute.experimental.partitioners.MinSizePartitioner`: a partitioner that allocates a minimum size per shard.\n",
        "> - `tf.compat.v1.fixed_size_partitioner` -> `tf.distribute.experimental.partitioners.FixedShardsPartitioner`: a partitioner that allocates a fixed number of shards."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Ig0-uCUbGprd"
      },
      "source": [
        "Alternatively, you can use a `MultiWorkerMirroredStrategy` object:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "xHXP8bOBGtXL"
      },
      "outputs": [],
      "source": [
        "# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.\n",
        "del os.environ['TF_CONFIG']\n",
        "strategy = tf.distribute.MultiWorkerMirroredStrategy()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "tOsmqefTGwUf"
      },
      "source": [
        "You can replace the strategy used above with a `MultiWorkerMirroredStrategy` object to perform training with this strategy.\n",
        "\n",
        "As with the `tf.estimator` APIs, since `MultiWorkerMirroredStrategy` is a multi-client strategy, there is no easy way to run distributed training in this Colab notebook. Therefore, replacing the code above with this strategy ends up running things locally. The Multi-worker training [with Keras Model.fit](../../tutorials/distribute/multi_worker_with_keras.ipynb)/[a custom training loop](../../tutorials/distribute/multi_worker_with_ctl.ipynb) tutorials demonstrate how to run multi-worker training with\n",
        " the `'TF_CONFIG'` variable set up, with two workers on a localhost in Colab. In practice, you would create multiple workers on external IP addresses/ports, and use the `'TF_CONFIG'` variable to specify the cluster configuration for each worker."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "917ef6135660"
      },
      "source": [
        "## Next steps"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "e76fd9d5c98c"
      },
      "source": [
        "To learn more about multi-worker distributed training with `tf.distribute.experimental.ParameterServerStrategy` and `tf.distribute.MultiWorkerMirroredStrategy` in TensorFlow 2, consider the following resources:\n",
        "\n",
        "- Tutorial: [Parameter server training with ParameterServerStrategy and Keras Model.fit/a custom training loop](../../tutorials/distribute/parameter_server_training.ipynb)\n",
        "- Tutorial: [Multi-worker training with MultiWorkerMirroredStrategy and Keras Model.fit](../../tutorials/distribute/multi_worker_with_keras.ipynb)\n",
        "- Tutorial: [Multi-worker training with MultiWorkerMirroredStrategy and a custom training loop](../../tutorials/distribute/multi_worker_with_ctl.ipynb)\n",
        "- Guide: [Distributed training with TensorFlow](../../guide/distributed_training.ipynb)\n",
        "- Guide: [Optimize TensorFlow GPU performance with the TensorFlow Profiler](../../guide/gpu_performance_analysis.ipynb)\n",
        "- Guide: [Use a GPU](../../guide/gpu.ipynb) (the Using multiple GPUs section)"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "collapsed_sections": [],
      "name": "multi_worker_cpu_gpu_training.ipynb",
      "toc_visible": true
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
